Skip to main content
Version: Next

Celebrity Quote Analysis with The Azure AI Services

from synapse.ml.services import *
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import SQLTransformer
from synapse.ml.core.platform import find_secret

# put your service keys here
ai_service_key = find_secret(
secret_name="ai-services-api-key", keyvault="mmlspark-build-keys"
)
ai_service_location = "eastus"

Setting up celebrity quote images

Here we set up a static list of celebrity image URLs for analysis.

# Static list of celebrity quote image URLs
celebrity_image_urls = [
"https://mmlspark.blob.core.windows.net/datasets/DSIR/test2.jpg",
"https://mmlspark.blob.core.windows.net/datasets/DSIR/test1.jpg",
]

urlDF = spark.createDataFrame([(url,) for url in celebrity_image_urls], ["url"])

Recognizing Images of Celebrities

This block identifies the name of the celebrities for each of the images.

celebs = (
RecognizeDomainSpecificContent()
.setSubscriptionKey(ai_service_key)
.setLocation(ai_service_location)
.setModel("celebrities")
.setImageUrlCol("url")
.setOutputCol("celebs")
)

# Extract the first celebrity we see from the structured response
firstCeleb = SQLTransformer(
statement="SELECT *, celebs.result.celebrities[0].name as firstCeleb FROM __THIS__"
)

Reading the quote from the image.

This stage performs OCR on the images to recognize the quotes.

from synapse.ml.stages import UDFTransformer

recognizeText = (
RecognizeText()
.setSubscriptionKey(ai_service_key)
.setLocation(ai_service_location)
.setImageUrlCol("url")
.setMode("Printed")
.setOutputCol("ocr")
.setConcurrency(5)
)


def getTextFunction(ocrRow):
if ocrRow is None:
return None
return "\n".join([line.text for line in ocrRow.recognitionResult.lines])


# this transformer wil extract a simpler string from the structured output of recognize text
getText = (
UDFTransformer()
.setUDF(udf(getTextFunction))
.setInputCol("ocr")
.setOutputCol("text")
)

Understanding the Sentiment of the Quote

sentimentTransformer = (
TextSentiment()
.setLocation(ai_service_location)
.setSubscriptionKey(ai_service_key)
.setTextCol("text")
.setOutputCol("sentiment")
)

# Extract the sentiment score from the API response body
getSentiment = SQLTransformer(
statement="SELECT *, sentiment.document.sentiment as sentimentLabel FROM __THIS__"
)

Tying it all together

Now that we have built the stages of our pipeline it's time to chain them together into a single model that can be used to process batches of incoming data

from synapse.ml.stages import SelectColumns

# Select the final columns
cleanupColumns = SelectColumns().setCols(
["url", "firstCeleb", "text", "sentimentLabel"]
)

celebrityQuoteAnalysis = PipelineModel(
stages=[
celebs,
firstCeleb,
recognizeText,
getText,
sentimentTransformer,
getSentiment,
cleanupColumns,
]
)

celebrityQuoteAnalysis.transform(urlDF).show(5)